Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17993][SQL] Fix Parquet log output redirection #15538

Closed

Conversation

mallman
Copy link
Contributor

@mallman mallman commented Oct 18, 2016

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)

What changes were proposed in this pull request?

PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

This only happens during execution, not planning, and it doesn't matter what log level the SparkContext is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to inferSchema during Hive table conversion. That call is what triggered the output redirection.

How was this patch tested?

I tested this manually in four ways:

  1. Executing spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test").
  2. Executing spark.read.format("parquet").load(legacyParquetFile).show for a Parquet file legacyParquetFile written using Parquet-mr 1.6.0.
  3. Executing select * from legacy_parquet_table limit 1 for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
  4. Executing select * from legacy_partitioned_parquet_table where partcol=x limit 1 for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of spark-shell or spark-sql.

Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc @ericl @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67150 has finished for PR 15538 at commit 6101b83.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -56,6 +56,7 @@ class ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
ParquetFileFormat.redirectParquetLogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only invoke this once per jvm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky bits are when, where and how.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you moved this into an object that was referenced by this class on init, would that suffice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you're suggesting. Can you elaborate further?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just be able to put this in the class init for ParquetFileFormat, by adding this to the body of object ParquetFileFormat below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -56,6 +56,7 @@ class ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
ParquetFileFormat.redirectParquetLogs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just be able to put this in the class init for ParquetFileFormat, by adding this to the body of object ParquetFileFormat below.

*/
def redirectParquetLogs(): Unit = {}
private def redirectParquetLogs(): Unit = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a dumb question but this is a no-op? how does this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I just call redirectParquetLogsViaSLF4J I get a compilation error:

[error] [warn] /Volumes/VideoAmpCS/msa/workspace/spark-master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:59: a pure expression does nothing in statement position; you may be omitting necessary parentheses
[error] [warn]   ParquetFileFormat.redirectParquetLogsViaSLF4J

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because it's not a method. I'm wondering what this method does, and I think it exists only to cause the object to initialize. That's awkward, but it already exists I guess. Is there really no way to implement a static init block in Scala? hm. If so then I support this change. It's actually being moved to a spot where it will be called earlier, and, be called fewer times, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe Scala classes have static initializers. That would be great here.

Moving this method call into the constructor ensures it's called no more times than it does if it's in a method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be clearer to call it once with an atomic boolean guard:

if (ParquetFileFormat.logsRedirected.compareAndSwap(false, true)) {
  ParquetFileFormat.redirectParquetLogsViaSLF4J()
}

Then you could test that the flag is set after constructing a ParquetFileFormat. That's obviously not a great test but probably good enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to clean things up in another commit. If nothing else, it's clear I need to add some code comments.

@mallman
Copy link
Contributor Author

mallman commented Oct 19, 2016

I could use some advice on writing a unit test for this. Do you guys know if there is a precedent in the codebase that covers a situation like this? I'd like to reuse existing code if possible.

@mallman
Copy link
Contributor Author

mallman commented Oct 20, 2016

I pushed a commit to improve the documentation. I also removed a couple of unused imports (boy scout rule).

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67279 has finished for PR 15538 at commit 099afca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
def redirectParquetLogs(): Unit = {}
private def ensureParquetLogRedirection(): Unit = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is better, though on a second look, it seems less indirect to either:

a) use a lazy val above and simply do anything with that reference that compiles, like log it or something. It didn't before because just referencing a field and doing nothing else will cause a warning, or

b) make it a method that executes once with a simple AtomicBoolean guard or boolean + synchronization

I suppose I'm wondering whether even this mechanism is guaranteed to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate your diligent feedback, and I agree there are a few different ways to implement this. However, I truly believe what I've got here is at least as good as the other options. Unless you object, I'd like to keep the approach I've taken here as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's no worse than the existing setup, though surely there's a better way to get once-per-class execution. Would you try one more thing:

private def ensureParquetLogRedirection(): Unit = redirectParquetLogsViaSLF4J

That would at least connect the things that are supposed to be connected though it's all a no-op and would look less wonky to me.

If that still generates a warning OK let's merge this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the log output redirection is working. The mystery here is why the test log level threshold is not having an effect in Jenkins.

Nevertheless I'll make the change and push it.

@mallman
Copy link
Contributor Author

mallman commented Oct 24, 2016

I've spent a couple hours today working on a unit test which captures stdout during a parquet write operation to validate that it has no parquet logging output. I haven't got it working yet, but I'll spend some more time on it tomorrow.

@ericl
Copy link
Contributor

ericl commented Oct 25, 2016

also LGTM as-is; it's polluting test output which is kind of annoying. Testing the log output might be a little overkill since this is not critical functionality and upstream has a fix pending release.

@mallman
Copy link
Contributor Author

mallman commented Oct 25, 2016

@ericl What do you mean it's polluting test output?

@ericl
Copy link
Contributor

ericl commented Oct 25, 2016

I see ~10k lines of "CorruptStatistics" stack traces in the jenkins log. Though, it also seems to show up in the test logs for this pr, so maybe it is unrelated?

@mallman
Copy link
Contributor Author

mallman commented Oct 25, 2016

I'll take a closer look at that.

@mallman
Copy link
Contributor Author

mallman commented Oct 26, 2016

The "CorruptStatistics" stack traces (which I agree are really annoying) are being logged because parquet logs them at the WARN level, and Spark's default logging threshold when running tests is WARN.

I've pushed a commit to set the parquet log threshold to ERROR for testing. This is actually the default configuration for a Spark deployment: https://github.com/apache/spark/blob/v2.0.1/conf/log4j.properties.template#L35.

@SparkQA
Copy link

SparkQA commented Oct 26, 2016

Test build #67542 has finished for PR 15538 at commit dec65c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Oct 26, 2016

I like this test failure:

org.apache.spark.sql.sources.CreateTableAsSelectSuite.(It is not a test)

Anyway, I don't think this is related to this PR.

@mallman
Copy link
Contributor Author

mallman commented Oct 26, 2016

So raising the log threshold looks like it didn't do anything for Jenkins, but when I run the tests locally it does just the trick. sigh

Anyway, might as well push a rebase and see what happens.

@mallman mallman force-pushed the spark-17993-fix_parquet_log_redirection branch from dec65c7 to 02df8c2 Compare October 26, 2016 04:02
@SparkQA
Copy link

SparkQA commented Oct 26, 2016

Test build #67552 has finished for PR 15538 at commit 02df8c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Oct 26, 2016

I'm still seeing the torrent of CorruptStatistics errors in the Jenkins build log, even though I don't see them running the tests locally with sbt. Maybe it's a maven versus sbt build issue.

Anyway, having spent a few hours trying to get a reliable, effective unit test for this functionality, I don't think it's going to happen.

Can you guys let me know if you have a blocking concern around this PR? Otherwise, can we please merge this to master? I'm reluctant to spend more time on this.

Cheers.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67658 has finished for PR 15538 at commit 1fc3c93.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Oct 27, 2016

Looks like the test failed for reasons unrelated to this PR. Can someone trigger a retest, please?

@ericl
Copy link
Contributor

ericl commented Oct 27, 2016

Jenkins retest this please

On Thu, Oct 27, 2016, 12:34 PM Michael Allman [email protected]
wrote:

Looks like the test failed for reasons unrelated to this PR. Can someone
trigger a retest, please?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15538 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAA6SrEooeo7aeoob_0UR5YycU3QSCVDks5q4PyqgaJpZM4KaZR_
.

@srowen
Copy link
Member

srowen commented Oct 28, 2016

Jenkins add to whitelist

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #3377 has finished for PR 15538 at commit 1fc3c93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67707 has finished for PR 15538 at commit 1fc3c93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, if this still works, this looks more understandable to me.

@mallman
Copy link
Contributor Author

mallman commented Oct 29, 2016

@srowen I ran my manual tests for this build and they worked as expected. Can you merge this PR?

@mallman
Copy link
Contributor Author

mallman commented Oct 29, 2016

Actually, this may not be working for remote executors. I tested this patch running in local mode, but in running a version of this with actual remote executors I'm seeing the original parquet log output.

Let me take a closer look before merging this PR.

@srowen
Copy link
Member

srowen commented Oct 30, 2016

OK, well whatever works here, even if somehow the previous version is what's needed to get it to work.

@mallman mallman force-pushed the spark-17993-fix_parquet_log_redirection branch from e544397 to 7f858c6 Compare October 30, 2016 22:08
@mallman
Copy link
Contributor Author

mallman commented Nov 7, 2016

@rxin I'm not working on the Parquet upgrade this week. I think we'll have to punt on it.

@SparkQA
Copy link

SparkQA commented Nov 7, 2016

Test build #68289 has finished for PR 15538 at commit 34e3997.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Nov 7, 2016

Yeah I see, but this is getting to be quite hacky just to turn off log messages

This isn't just a few annoying log messages. This is an avalanche of log messages, each of which contains a deep stack trace.

You can read https://issues.apache.org/jira/browse/SPARK-17993?focusedCommentId=15645322&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15645322 for another perspective.

I agree this patch is ugly. I don't like writing code like this. I just don't see a feasible alternative for Spark 2.1. It can be removed as part of a future Parquet 1.9 upgrade.

I abbreviated the console output when I created SPARK-17993. Running a spark-sql query with 10 results on a table written with Spark 1.5 results in 7,052 lines of console output. (The last 10 are the actual results.)

@srowen
Copy link
Member

srowen commented Nov 7, 2016

OK, maybe. It is not just about being hacky but being likely to break. If this really calls for a static initializer, how about a Java helper class to do the work? How about piggy backing on the Logging init routines? I don't see this as something without alternatives that needs to be pushed through

@mallman
Copy link
Contributor Author

mallman commented Nov 7, 2016

Sorry, I don't see how Java code will make this patch any better. We're not really missing a static initializer. We just need some initializer to run early enough.

If we can tolerate putting parquet-specific code in datasource-agnostic code there are a number of places where we can put a call to the log redirection code. I've tried to avoid putting parquet specific code outside the parquet specific codebase.

@srowen
Copy link
Member

srowen commented Nov 7, 2016

If ParquetFileFormat had a static init block somehow, we'd be done right? because the logging config in that static initializer would have to execute, once, during classloading and therefore before any usage in a constructor or serialization. But per above, you can't write such a block in Scala. So, make a dummy ParquetLogHelper.java class, and make ParquetFileFormat have a dummy field of its type or something. ParquetLogHelper can have a static init, and must load when ParquetFileFormat loads. I might miss a reason that doesn't work, but seems like the sort of thing to rule out before settling on a complicated solution. It would be a little simpler than the current code.

I don't think it's so wrong to add logic directly to Logging.scala as a fairly temporary workaround. It wouldn't entail an actual compile or runtime dependency on Parquet in core. At least for me that seems simpler than the heroic attempt to intercept serialization and constructor paths. I'd hope it's sufficient to add it to Logging, which should have to init very early, everywhere, once.

@mallman
Copy link
Contributor Author

mallman commented Nov 8, 2016

I'll work on a revision and try to push something today.

Michael Allman added 7 commits November 9, 2016 14:11
constructing an instance of `ParquetFileFormat`. Before, it was occurring
as part of the call to `inferSchema` and in `prepareWrite`, however not
all Parquet access occurs through one of those methods. We add this
redirection to the constructor to ensure that any instantiation of
`ParquetFileFormat` triggers log redirection if it hasn't already
happened
redirectParquetLogsViaSLF4J
ParquetLogRedirector which performs the redirection as part of its class
initialization
@mallman mallman force-pushed the spark-17993-fix_parquet_log_redirection branch from 34e3997 to 247ef91 Compare November 9, 2016 22:12
@mallman
Copy link
Contributor Author

mallman commented Nov 9, 2016

Pushed a new version which I think is cleaner than before. I tested all 8 scenarios manually.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68424 has finished for PR 15538 at commit 247ef91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • final class ParquetLogRedirector implements Serializable

@srowen
Copy link
Member

srowen commented Nov 10, 2016

I like this. It centralizes and yet elaborates the handling of the issue, does seem to be the right mechanism to use, and I presume, does work reliably.

@mallman
Copy link
Contributor Author

mallman commented Nov 10, 2016

@ericl Does this LGTY?

@rxin
Copy link
Contributor

rxin commented Nov 10, 2016

I don't think we are going to upgrade Parquet for branch-2.1, since it's way past that point. Let's merge this for 2.1.

Merging in master/branch-2.1.

asfgit pushed a commit that referenced this pull request Nov 10, 2016
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)
## What changes were proposed in this pull request?

PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

```
Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
```

This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection.

## How was this patch tested?

I tested this manually in four ways:
1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`.
2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0.
3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of `spark-shell` or `spark-sql`.

Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc ericl dongjoon-hyun

Author: Michael Allman <[email protected]>

Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection.

(cherry picked from commit b533fa2)
Signed-off-by: Reynold Xin <[email protected]>
@asfgit asfgit closed this in b533fa2 Nov 10, 2016
@mallman mallman deleted the spark-17993-fix_parquet_log_redirection branch November 10, 2016 21:47
ghost pushed a commit to dbtsai/spark that referenced this pull request Jan 17, 2017
## What changes were proposed in this pull request?

Changing the default parquet logging levels to reflect the changes made in PR [apache#15538](apache#15538), in order to prevent the flood of log messages by default.

## How was this patch tested?

Default log output when reading from parquet 1.6 files was compared with and without this change. The change eliminates the extraneous logging and makes the output readable.

Author: Nick Lavers <[email protected]>

Closes apache#16580 from nicklavers/spark-19219-set_default_parquet_log_level.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)
## What changes were proposed in this pull request?

PR apache#14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

```
Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
```

This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection.

## How was this patch tested?

I tested this manually in four ways:
1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`.
2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0.
3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of `spark-shell` or `spark-sql`.

Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to apache#14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc ericl dongjoon-hyun

Author: Michael Allman <[email protected]>

Closes apache#15538 from mallman/spark-17993-fix_parquet_log_redirection.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

Changing the default parquet logging levels to reflect the changes made in PR [apache#15538](apache#15538), in order to prevent the flood of log messages by default.

## How was this patch tested?

Default log output when reading from parquet 1.6 files was compared with and without this change. The change eliminates the extraneous logging and makes the output readable.

Author: Nick Lavers <[email protected]>

Closes apache#16580 from nicklavers/spark-19219-set_default_parquet_log_level.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

Changing the default parquet logging levels to reflect the changes made in PR [apache#15538](apache#15538), in order to prevent the flood of log messages by default.

## How was this patch tested?

Default log output when reading from parquet 1.6 files was compared with and without this change. The change eliminates the extraneous logging and makes the output readable.

Author: Nick Lavers <[email protected]>

Closes apache#16580 from nicklavers/spark-19219-set_default_parquet_log_level.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants